package com.fudan.run.dataset;

import java.util.Map;

import org.apache.spark.api.java.JavaRDD;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import com.fudan.cfg.base.DatasetDef;
import com.fudan.cfg.dataset.ElasticsearchDataset;
import com.fudan.run.JobRunner;
import com.fudan.run.ctx.annotation.DatasetAdapter;

@DatasetAdapter("elasticsearch")
public class ElasticsearchRdd extends DatasetHandler<ElasticsearchDataset>{

	@Override
	public JavaRDD<Map<String, Object>> rdd(JobRunner context,ElasticsearchDataset elasticsearchDataset) {
		return JavaEsSpark.esRDD(context.getJavaSparkContext(), elasticsearchDataset.getSource()).values();
	}

}
